package org.qy.rocketmq.cs.service.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.qy.rocketmq.cs.rocket.MQExecutionProcessor;
import org.qy.rocketmq.cs.rocket.MQFunction;
import org.qy.rocketmq.cs.rocket.listener.ProducerTransactionListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
 * All rights Reserved, Designed By www.cu-sc.com
 *
 * @Title:  ProducerTransactionListenerImpl.java.java
 * @Package org.qy.rocketmq.cs.service.producer
 * @Description: TODO
 * @Since: JDK 1.8
 * @Author: fangyukang
 * @Email: fangyk@cu-sc.com
 * @Version: v1.0.0
 * @Date: 2020/7/1 22:21
 * @Copyright: 2020 www.cu-sc.com All rights reserved. <br/>
 * 注意：本内容仅限于联通集团内部传阅，禁止外泄以及用于其他的商业目的<br/>
 */
@Slf4j
@Component
public class ProducerTransactionListenerImpl implements ProducerTransactionListener
{
    /**
     * @Function: ProducerTransactionListenerImpl.java
     * @Description: 消息二次确认
     * @params:
     * @return:
     * @throws: 异常描述
     * @version: v1.0.0
     * @author: fangyukang
     * @date: 2020/7/1 22:22
     * 注意:本内容仅限于联通智网科技有限公司内部传阅,禁止外泄以及用于其他的商业目
     * Modification History:
     * Date              Author          Version         Description
     * 2020/7/1        fangyukang          v1.0.0          修改原因
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("ProducerTransactionListenerImpl|LocalTransactionState|execute mq transaction topic: {} tags: {} transactionId:{}", message.getTopic(), message.getTags() , message.getTransactionId());
        MQExecutionProcessor executionProcessor = (MQExecutionProcessor) o;
        try{
            MQFunction fun = executionProcessor.getFun();
            if(Objects.nonNull(fun)) {fun.apply();}
        }catch (RuntimeException e){
            log.info("ProducerTransactionListenerImpl|LocalTransactionState|mq transactionId:{} ROLLBACK_MESSAGE", message.getTransactionId());
            //标记为执行失败
            executionProcessor.setResult(false);
            executionProcessor.setE(e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * @Function: ProducerTransactionListenerImpl.java
     * @Description: 消息检查确认
     * @params:
     * @return:
     * @throws: 异常描述
     * @version: v1.0.0
     * @author: fangyukang
     * @date: 2020/7/1 22:22
     * 注意:本内容仅限于联通智网科技有限公司内部传阅,禁止外泄以及用于其他的商业目
     * Modification History:
     * Date              Author          Version         Description
     * 2020/7/1        fangyukang          v1.0.0          修改原因
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt)
    {
        LocalTransactionState state = LocalTransactionState.ROLLBACK_MESSAGE;
        String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        //解析消息
        switch (messageExt.getTags()){
            default:
                state = LocalTransactionState.UNKNOW;
                log.warn("ProducerTransactionListenerImpl|LocalTransactionState|MQ producer listener 没有对应业务处理 topic:{}  tags:{}", messageExt.getTopic(), messageExt.getTags());
        }
        log.info("ProducerTransactionListenerImpl|LocalTransactionState|check mq transaction topic: {} tags: {} msgId:{} state:{}", messageExt.getTopic(), messageExt.getTags() , messageExt.getMsgId(), state.name());
        return state;
    }
}
